OPC Studio User's Guide and Reference
OPC Data Observables
Client and Subscriber Development > Development Models > Reactive Programming Model > Reactive Programming Model for OPC Data (Classic and UA) > OPC Data Observables
In This Topic

OPC DA (Classic) Observables

The DAItemChangedObservable<TValue> class is an observable for changes in OPC Data Access item or multiple items of type TValue. It represents a data stream with information about significant data changes in the subscribed items.

Each significant change is represented by an OnNext message of type EasyDAItemChangedEventArgs<T>. This message is used both for successes, when the Exception property is a null reference and the Vtq property contains valid data, and for failures, when the Exception property is non-null and contains the failure information.

The OnCompleted and OnError messages (methods of the IObserver) are never sent (not even in case of error related to the OPC item), thus the data stream is not terminated. If your application requires, you can process the data stream further, and filter it or split it by success/failure as needed.

For OPC Classic, you can create instances of DAItemChangedObservable<TValue> either by using its constructor, or with use of a static DAItemChangedObservable class with several overloads of the Create method. The static DAItemChangedObservable.Create methods use the default underlying EasyDAClient object for OPC reactive extensions. If you need to set some parameters in the client object, you can use the ClientSelector property to specify them.

This approach allows the code be expressed only in terms of pure OPC logic, and be not tied to the actual way it is implemented.

The following code fragment creates the observable using one of the Create methods:

   DAItemChangedObservable<double> ramp = 
        DAItemChangedObservable.Create<double>(
            "", "OPCLabs.KitServer.2", "Demo.Ramp", 1000);

 

It is recommended that you create the instances using the DAItemChangedObservable.Create method unless you have special needs. 

Examples

// Shows how to create an observable for OPC-DA item changes, and subscribe to it.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-OpcStudio/Latest/examples.html .

using System;
using System.Threading;
using OpcLabs.EasyOpc.DataAccess.Reactive;

namespace ReactiveDocExamples
{
    namespace _DAItemChangedObservable
    {
        partial class Subscribe
        {
            public static void Main1()
            {
                Console.WriteLine("Creating observable...");
                DAItemChangedObservable<double> ramp = 
                    DAItemChangedObservable.Create<double>("", "OPCLabs.KitServer.2", "Demo.Ramp", 1000);

                Console.WriteLine("Subscribing...");
                using (ramp.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.Vtq.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to create an observable for OPC-DA item changes, and subscribe to it with percent deadband.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-OpcStudio/Latest/examples.html .

using System;
using System.Threading;
using OpcLabs.EasyOpc.DataAccess.Reactive;

namespace ReactiveDocExamples
{
    namespace _DAItemChangedObservable
    {
        partial class Subscribe
        {
            public static void PercentDeadband()
            {
                const float percentDeadband = 5.0f;
                Console.WriteLine($"Creating observable with {percentDeadband}% deadband...");
                DAItemChangedObservable<double> ramp = 
                    DAItemChangedObservable.Create<double>("", "OPCLabs.KitServer.2", "Simulation.Ramp 0:100 (10 s)", 
                        requestedUpdateRate:100, percentDeadband:percentDeadband);

                Console.WriteLine("Subscribing...");
                using (ramp.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.Vtq.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

OPC UA (Data) Observables

The UADataChangeNotificationObservable<TValue> class is an observable for changes in OPC-UA monitored item or multiple items of type TValue. It represents a data stream with information about significant data changes in the subscribed items.

Each significant change is represented by an OnNext message of type EasyUADataChangeNotificationEventArgs<T>. This message is used both for successes, when the Exception property is a null reference and the AttributeData property contains valid data, and for failures, when the Exception property is non-null and contains the failure information.

The OnCompleted and OnError messages (methods of the IObserver) are never sent (not even in case of error related to the OPC item), thus the data stream is not terminated. If your application requires, you can process the data stream further, and filter it or split it by success/failure as needed.

For OPC-UA, you can create instances of UADataChangeNotificationObservable<TValue> either by using its constructor, or with use of a static UADataChangeNotificationObservable class with several overloads of the Create method. The static UADataChangeNotificationObservable.Create methods use the default underlying EasyUAClient object for OPC reactive extensions. If you need to set some parameters in the client object, you can use the ClientSelector property to specify them.

This approach allows the code be expressed only in terms of pure OPC logic, and be not tied to the actual way it is implemented. 

It is recommended that you create the instances using the UADataChangeNotificationObservable.Create method unless you have special needs.

Examples

// Shows how to create an observable for OPC-UA monitored item changes, and subscribe to it.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-OpcStudio/Latest/examples.html .

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void Main1()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853", 1000);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to observe OPC UA data changes with a specified data change filter.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-OpcStudio/Latest/examples.html .

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void DataChangeTrigger()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                var attributeArguments = new UAAttributeArguments(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853");
                // Report a notification if either the StatusCode or the value change. 
                var monitoredItemArguments = new UAMonitoredItemArguments(attributeArguments,
                    new UAMonitoringParameters(samplingInterval: 1000, dataChangeFilter: UADataChangeTrigger.StatusValue));
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to observe OPC UA data changes with a specified absolute deadband.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-OpcStudio/Latest/examples.html .

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void AbsoluteDeadband()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                const double absoluteDeadband = 50;
                Console.WriteLine($"Creating observable with absolute deadband {absoluteDeadband}...");
                var attributeArguments = new UAAttributeArguments(
                    endpointDescriptor,
                    "nsu=http://test.org/UA/Data/ ;i=11194");    // /Data.Dynamic.AnalogScalar.Int32Value
                var monitoredItemArguments = new UAMonitoredItemArguments(
                    attributeArguments,
                    new UAMonitoringParameters(samplingInterval: 1000, dataChangeFilter: absoluteDeadband));
                UADataChangeNotificationObservable<int> observable =
                    UADataChangeNotificationObservable.Create<int>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to observe OPC UA data changes with a specified percent deadband.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-OpcStudio/Latest/examples.html .

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void PercentDeadband()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                const double percentDeadband = 5.0;
                Console.WriteLine($"Creating observable with  {percentDeadband}% deadband...");
                var attributeArguments = new UAAttributeArguments(
                    endpointDescriptor,
                    "nsu=http://test.org/UA/Data/ ;i=11194");    // /Data.Dynamic.AnalogScalar.Int32Value
                var monitoredItemArguments = new UAMonitoredItemArguments(
                    attributeArguments,
                    new UAMonitoringParameters(
                        samplingInterval: 1000, 
                        new UADataChangeFilter(UADeadbandType.Percent, percentDeadband)));
                UADataChangeNotificationObservable<int> observable =
                    UADataChangeNotificationObservable.Create<int>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows how to create an observable for a range of OPC UA array elements, and subscribe to it.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-OpcStudio/Latest/examples.html .

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.OperationModel;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void IndexRangeList()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                var attributeArguments = new UAAttributeArguments(endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10933")
                {
                    IndexRangeList = UAIndexRangeList.OneDimension(2, 4)
                };
                var monitoredItemArguments = new UAMonitoredItemArguments(attributeArguments, monitoringParameters: 1000);
                UADataChangeNotificationObservable<Int32[]> observable =
                    UADataChangeNotificationObservable.Create<Int32[]>(monitoredItemArguments);

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

// Shows an OPC UA data change observable with specified timeouts.
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-OpcStudio/Latest/examples.html .

using OpcLabs.EasyOpc.UA.Reactive;
using System;
using System.Threading;
using OpcLabs.EasyOpc.UA;

namespace ReactiveDocExamples
{
    namespace _UADataChangeNotificationObservable
    {
        partial class Subscribe
        {
            public static void Timeouts()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating observable...");
                UADataChangeNotificationObservable<float> observable =
                    UADataChangeNotificationObservable.Create<float>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10853", 1000);
                // Set fairly short timeouts (failure can thus occur).
                observable.ClientSelector.Isolated = true;
                observable.ClientSelector.IsolatedParameters.SessionParameters.EndpointSelectionTimeout = 1500;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionConnectTimeout = 3000;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionTimeout = 3000;
                observable.ClientSelector.IsolatedParameters.SessionParameters.SessionTimeoutDebug = 3000;

                Console.WriteLine("Subscribing...");
                using (observable.Subscribe(e => Console.WriteLine(
                    (e.Exception is null) ? e.AttributeData.ToString() : e.Exception.GetBaseException().ToString())))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10*1000);

                    Console.WriteLine("Unsubscribing...");
                }

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}

 

See Also

Examples - Reactive Programming